# !pip install librosa
import torch
import torchvision
import numpy as np
import math
from sklearn.metrics import accuracy_score
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as func
from torchvision import transforms, datasets
from torch.utils.data import SubsetRandomSampler, RandomSampler, random_split
from torch.utils.data import TensorDataset
import matplotlib.pyplot as plt
import librosa
import IPython
import soundfile as sf
from sklearn.model_selection import ShuffleSplit
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from google.colab import drive
drive.mount('/content/drive')
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
# !pip install librosa
s, sr = librosa.load('A2_data/train_clean_male.wav', sr = None)
S_noabs = librosa.stft(s, n_fft = 1024, hop_length = 512)
s, sr = librosa.load('A2_data/train_dirty_male.wav', sr = None)
X_noabs = librosa.stft(s, n_fft = 1024, hop_length = 512)
# XT_noabs = np.reshape(X_noabs, (X_noabs.shape[1], X_noabs.shape[0]))
# ST_noabs = np.reshape(X_noabs, (S_noabs.shape[1], S_noabs.shape[0]))
XT_noabs = X_noabs.T
ST_noabs = S_noabs.T
print(X_noabs.shape, S_noabs.shape)
# XT_noabs = X_noabs.T
# ST_noabs = S_noabs.T
# print(XT_noabs.shape, ST_noabs.shape)
(513, 2459) (513, 2459)
XT_abs = np.abs(XT_noabs)
ST_abs = np.abs(ST_noabs)
# Split sample for validation data and train data
ss = ShuffleSplit(n_splits=1, test_size=0.2, random_state=100)
ss.get_n_splits(XT_abs, ST_abs)
train_index, test_index = next(ss.split(XT_abs, ST_abs))
# x_train, x_val, y_train, y_val = train_test_split(XT_abs, ST_abs, test_size=0.25, random_state=100)
x_train, y_train = XT_abs[train_index], ST_abs[train_index]
x_val, y_val = XT_abs[test_index], ST_abs[test_index]
tensor_xtr = torch.from_numpy(x_train) # transform to torch tensor
tensor_xval = torch.from_numpy(x_val)
tensor_ytr = torch.from_numpy(y_train) # transform to torch tensor
tensor_yval = torch.from_numpy(y_val)
print(tensor_xtr.shape, tensor_xval.shape)
train_dataset = torch.utils.data.TensorDataset(tensor_xtr, tensor_ytr)
valid_dataset = torch.utils.data.TensorDataset(tensor_xval, tensor_yval)
torch.Size([1967, 513]) torch.Size([492, 513])
# Our MNIST is already in tensorDataset format
batch_size_train = 128
batch_size_test = len(valid_dataset)
train_loader = torch.utils.data.DataLoader(dataset= train_dataset,
batch_size= batch_size_train,
shuffle= True)
valid_loader = torch.utils.data.DataLoader(dataset= valid_dataset,
batch_size= batch_size_test,
shuffle= False)
# Get device
if(torch.cuda.is_available()):
device = torch.device('cuda')
print(f"Using {device} device")
print(torch.cuda.get_device_name(0))
torch.backends.cudnn.enabled = False
# Set torch random seed
torch.manual_seed(100)
np.random.seed(100)
Using cuda device NVIDIA GeForce GTX 1650 Ti
# Create a fully-connected neural network - need final activation to give non-negative values
class q1_nn(nn.Module):
def __init__(self, input_dim):
super().__init__()
self.fp_input = nn.Linear(513, 1024)
self.fp1 = nn.Linear(1024, 1024)
self.fp2 = nn.Linear(1024, 1024)
# self.fp3 = nn.Linear(1024, 1024)
# self.fp4 = nn.Linear(1024, 1024)
self.fp_output = nn.Linear(1024, 513)
self.dropout= nn.Dropout(0.2)
self.ReLU = nn.ReLU()
def forward(self, input):
input = self.fp_input(input)
input = self.ReLU(input)
input = self.fp1(input)
input = self.ReLU(input)
input = self.fp2(input)
input = self.ReLU(input)
# input - self.dropout(input)
# input = self.fp3(input)
# input = self.ReLU(input)
# input = self.fp4(input)
# input = self.ReLU(input)
input = self.fp_output(input)
input = self.ReLU(input) # not sure if this is needs to be here or after calculating Mean Square Error
return input
# MOST IMPORTANT REALIZATION...USE SOFTMAX AFTER CALCULATING CCE LOSS, THE CCE LOSS EXPECTS LOGITS AND NOT SOFTMAX OUTPUTS.
def he_weights(m):
if isinstance(m, nn.Linear):
torch.nn.init.kaiming_uniform_(m.weight, a=0, mode='fan_in', nonlinearity='relu')
m.bias.data.fill_(0)
def normal_weights(m):
if isinstance(m, nn.Linear):
torch.nn.init.normal_(m.weight, mean=0.0, std=0.01)
m.bias.data.fill_(0.01)
def xavier_weights(m):
if isinstance(m, nn.Linear):
torch.nn.init.xavier_uniform_(m.weight)
m.bias.data.fill_(0)
def uniform_weights(m):
if isinstance(m, nn.Linear):
# get the number of the inputs
n = m.in_features
y = 1.0/np.sqrt(n)
m.weight.data.uniform_(-y, y)
m.bias.data.fill_(0)
# Define function to run network
def run_network(net, epochs, loss_criteria, optimizer):
# A minor difference is that the implementation of CrossEntrypyLoss implicitly applies a softmax activation followed by a log transformation
# but NLLLoss does not.
train_loss_all = []
valid_loss_all = []
# To keep best performance value
best_performance = float('inf')
# For early stopping
tolerance_level = 0
early_stopping_activated = 0
epoch = 0
MAX_MODEL = None
MAX_PERFORMANCE_WEIGHTS = None
while( epoch <= epochs and early_stopping_activated == 0 ):
train_loss = 0
valid_loss = 0
# Always have this line to ensure proper training
net.train()
for i, (data, actual) in enumerate(train_loader):
# Push all variables to cuda
if(torch.cuda.is_available()):
data, actual = data.to(device), actual.to(device)
output = net(data)
loss = loss_criteria(output, actual)
optimizer.zero_grad() # reset gradients
loss.backward()
optimizer.step()
# Track loss
train_loss += loss.item()
train_loss_all.append(train_loss)
# evaluation part
# Always have this line to ensure proper evaluation
net.eval()
actual_values_all = []
pred_values_all = []
# Now do validation and keep track of valid loss
with torch.no_grad():
for j, (data, actual) in enumerate(valid_loader):
if(torch.cuda.is_available()):
data, actual = data.to(device), actual.to(device)
# FP
val_preds = net.forward(data)
actual_values_all.append(actual.cpu().numpy())
pred_values_all.append(val_preds.cpu().numpy())
all_preds = np.concatenate(pred_values_all, axis=0)
all_targets = np.concatenate(actual_values_all, axis=0)
rmse_val = np.sqrt(mean_squared_error(all_targets,all_preds))
if(epoch % 10 == 0):
print(f'Epoch {epoch} \t\t Epoch Training loss: {train_loss} \t\t Validation RMSE {rmse_val}')
# Implementing early stopping
if(rmse_val < best_performance):
best_performance = rmse_val
tolerance_level = 0
# Save Model
MAX_PERFORMANCE_WEIGHTS = net.state_dict()
MAX_MODEL = net
else:
tolerance_level+=1
if(tolerance_level >= 20):
early_stopping_activated = 1
print('Early Stopping activated - no improvement in validation mse for the past 20 epochs. Using model stage before 20 epochs for further use.')
epoch+=1
print('Best Performance on Validation set achived till now :', best_performance)
return MAX_MODEL, MAX_PERFORMANCE_WEIGHTS
# First create model and then run with function
from sklearn.metrics import mean_squared_error
net = None
net = q1_nn(input_dim = 513).to(device)
net.apply(xavier_weights)
optimizer = torch.optim.Adam(net.parameters(), lr = 0.001)
loss_func = nn.MSELoss()
q1_net, q1_weights = run_network(net, epochs = 200, loss_criteria = loss_func, optimizer = optimizer)
Epoch 0 Epoch Training loss: 0.9227895438671112 Validation RMSE 0.16919326782226562 Epoch 10 Epoch Training loss: 0.13572634477168322 Validation RMSE 0.0958026796579361 Epoch 20 Epoch Training loss: 0.10608404967933893 Validation RMSE 0.09080462902784348 Epoch 30 Epoch Training loss: 0.07544987415894866 Validation RMSE 0.08334947377443314 Epoch 40 Epoch Training loss: 0.07257696939632297 Validation RMSE 0.0868820920586586 Epoch 50 Epoch Training loss: 0.07092355354689062 Validation RMSE 0.08432945609092712 Early Stopping activated - no improvement in validation mse for the past 20 epochs. Using model stage before 20 epochs for further use. Best Performance on Validation set achived till now : 0.08164829
# EXPERT ADVICE..We get 12 SNR for lr = 0.0001
IPython.display.Audio("A2_data/test_x_01.wav")
s, sr_test = librosa.load('A2_data/test_x_01.wav', sr = None)
S_test = librosa.stft(s, n_fft = 1024, hop_length = 512)
ST_test = S_test.T
ST_test_abs = np.abs(ST_test)
tensor_test = torch.tensor(ST_test_abs)
print('Test data shape ', tensor_test.shape)
test_dataset = torch.utils.data.TensorDataset(tensor_test)
test_loader = torch.utils.data.DataLoader(dataset=test_dataset,
batch_size=len(test_dataset),
shuffle=False)
q1_net.eval()
with torch.no_grad():
for i, (data) in enumerate(test_loader):
# print(data[0].shape)
data = data[0].to(device)
test_preds = q1_net.forward(data)
test_preds = test_preds.T
print(test_preds.shape)
# to recover audio from this data
Preds_noabs = np.multiply( np.divide(S_test, np.abs(S_test)), np.abs(test_preds.cpu().numpy()) )
# Recover time-domain speech signal by applying inverse STFT
S_hat_test = librosa.istft(stft_matrix = Preds_noabs, hop_length = 512)
sf.write('q1_test_s_01_recons.wav', S_hat_test, sr_test)
Test data shape torch.Size([142, 513]) torch.Size([513, 142])
IPython.display.Audio("q1_test_s_01_recons.wav")
IPython.display.Audio("A2_data/test_x_02.wav")
s, sr_test = librosa.load('A2_data/test_x_02.wav', sr = None)
S_test = librosa.stft(s, n_fft = 1024, hop_length = 512)
ST_test = S_test.T
ST_test_abs = np.abs(ST_test)
tensor_test = torch.tensor(ST_test_abs)
print('Test data shape ', tensor_test.shape)
test_dataset = torch.utils.data.TensorDataset(tensor_test)
test_loader = torch.utils.data.DataLoader(dataset=test_dataset,
batch_size=len(test_dataset),
shuffle=False)
q1_net.eval()
with torch.no_grad():
for i, (data) in enumerate(test_loader):
data = data[0].to(device)
test_preds = q1_net.forward(data)
test_preds = test_preds.T
print(test_preds.shape)
# to recover audio from this data
Preds_noabs = np.multiply( np.divide(S_test, np.abs(S_test)), np.abs(test_preds.cpu().numpy()) )
# Recover time-domain speech signal by applying inverse STFT
S_hat_test = librosa.istft(stft_matrix = Preds_noabs, hop_length = 512)
sf.write('q1_test_s_02_recons.wav', S_hat_test, sr_test)
IPython.display.Audio("q1_test_s_02_recons.wav")
Test data shape torch.Size([380, 513]) torch.Size([513, 380])
# get clean speech from validation data
q1_net.eval()
with torch.no_grad():
for i, (data) in enumerate(valid_loader):
data = data[0].to(device)
valid_preds = q1_net.forward(data)
valid_preds = valid_preds.T.cpu().numpy()
# get x_test - this is the unclean or dirty or input speech
x_test = X_noabs.T[test_index].T
x_test_abs = np.abs(x_test)
# to recover audio from this data
valid_noabs = np.multiply( np.divide(x_test, x_test_abs), np.abs(valid_preds) )
# Recover time-domain speech signal by applying inverse STFT
valid_noabs_timedom = librosa.istft(stft_matrix = valid_noabs, hop_length = 512)
# Now get the time domain of clean actual speech of test_index
s_test = S_noabs.T[test_index].T
y_val_timedom = librosa.istft(stft_matrix = s_test, hop_length = 512)
# Calculate SNR
num = np.dot(y_val_timedom, y_val_timedom)
den = np.dot((y_val_timedom - valid_noabs_timedom),(y_val_timedom - valid_noabs_timedom))
print('SNR score on validation data: ', 10*math.log10( (num/den) + 1e-20 ))
SNR score on validation data: 10.538691356618084
# Save this model:
# torch.save(q1_weights, 'A2_Final Model Q1')
s, sr = librosa.load('A2_data/train_clean_male.wav', sr = None)
S_noabs = librosa.stft(s, n_fft = 1024, hop_length = 512)
s, sr = librosa.load('A2_data/train_dirty_male.wav', sr = None)
X_noabs = librosa.stft(s, n_fft = 1024, hop_length = 512)
XT_noabs = X_noabs.T
ST_noabs = S_noabs.T
XT_abs = np.abs(XT_noabs)
ST_abs = np.abs(ST_noabs)
ss = ShuffleSplit(n_splits=1, test_size=0.2, random_state=100, train_size = 0.8)
ss.get_n_splits(XT_abs, ST_abs)
train_index, test_index = next(ss.split(XT_abs, ST_abs))
# x_train, x_val, y_train, y_val = train_test_split(XT_abs, ST_abs, test_size=0.25, random_state=100)
x_train, y_train = XT_abs[train_index], ST_abs[train_index]
x_val, y_val = XT_abs[test_index], ST_abs[test_index]
tensor_xtr = torch.from_numpy(x_train) # transform to torch tensor
tensor_xval = torch.from_numpy(x_val)
tensor_ytr = torch.from_numpy(y_train) # transform to torch tensor
tensor_yval = torch.from_numpy(y_val)
train_dataset = torch.utils.data.TensorDataset(tensor_xtr, tensor_ytr)
valid_dataset = torch.utils.data.TensorDataset(tensor_xval, tensor_yval)
# Our MNIST is already in tensorDataset format
batch_size_train = 128
batch_size_test = len(valid_dataset)
train_loader = torch.utils.data.DataLoader(dataset= train_dataset,
batch_size= batch_size_train,
shuffle= True)
valid_loader = torch.utils.data.DataLoader(dataset= valid_dataset,
batch_size= batch_size_test,
shuffle= False)
# Get device
if(torch.cuda.is_available()):
device = torch.device('cuda')
print(f"Using {device} device")
print(torch.cuda.get_device_name(0))
torch.backends.cudnn.enabled = False
# Set torch random seed
torch.manual_seed(100)
np.random.seed(100)
Using cuda device NVIDIA GeForce GTX 1650 Ti
def he_weights(m):
if isinstance(m, nn.Linear):
torch.nn.init.kaiming_uniform_(m.weight, a=0, mode='fan_in', nonlinearity='relu')
m.bias.data.fill_(0)
# Now add for conv layer also
elif isinstance(m, nn.Conv1d):
torch.nn.init.kaiming_uniform_(m.weight, a=0, mode='fan_in', nonlinearity='relu')
m.bias.data.fill_(0)
def normal_weights(m):
if isinstance(m, nn.Linear):
torch.nn.init.normal_(m.weight, mean=0.0, std=0.01)
m.bias.data.fill_(0.01)
def xavier_weights(m):
if isinstance(m, nn.Linear):
torch.nn.init.xavier_uniform_(m.weight)
m.bias.data.fill_(0)
elif isinstance(m, nn.Conv1d):
torch.nn.init.xavier_uniform_(m.weight)
m.bias.data.fill_(0)
def uniform_weights(m):
if isinstance(m, nn.Linear):
# get the number of the inputs
n = m.in_features
y = 1.0/np.sqrt(n)
m.weight.data.uniform_(-y, y)
m.bias.data.fill_(0)
# Create convolution network
# Create a fully-connected neural network - need final activation to give non-negative values
class q2_nn(nn.Module):
def __init__(self, input_dim):
super().__init__()
self.output_channels = 128
self.conv_kernel_size = 3
self.pool_kernel_size = 3
self.conv_stride = 1
self.pool_stride = 1
self.conv_padding = 0
self.pool_padding = 0
self.inputwidth_conv = input_dim
self.conv_layer1 = nn.Conv1d(in_channels = 1, out_channels = self.output_channels, padding = 'valid', kernel_size=self.conv_kernel_size, stride = self.conv_stride) # 300 kernels, each of size 5 and no zero padding
self.max_pool1 = nn.MaxPool1d(kernel_size = self.pool_kernel_size, stride = self.pool_stride, padding = self.pool_padding)
conv1_size_op, pool1_size_op = self.calculate_size()
flatten_size = self.output_channels * int(pool1_size_op)
self.fp_input = nn.Linear(int(flatten_size), 1024)
self.fp_output = nn.Linear(1024, 513)
self.dropout= nn.Dropout(0.2)
self.ReLU = nn.ReLU()
def calculate_size(self):
# calculate using [(W−K+2P)/S]+1
# (513-4-1) + 1
# (509-2-1) + 1
# padding = 0, dilation = 1
conv1_size_op = (( self.inputwidth_conv - (self.conv_kernel_size - 1) - 1 + 2*self.conv_padding )/self.conv_stride) + 1
pool1_size_op = (( conv1_size_op - (self.pool_kernel_size - 1) - 1 + 2*self.pool_padding )/self.pool_stride) + 1
return conv1_size_op, pool1_size_op
def forward(self, input):
input = self.conv_layer1(input)
input = self.ReLU(input)
input = self.max_pool1(input)
# Now flatten this, and start from dimension 1 (don't include batch size to flatten)
input = torch.flatten(input, start_dim = 1)
# print('fireflies', input.shape )
input = self.fp_input(input)
input = self.ReLU(input)
# input = self.fp1(input)
# input = self.ReLU(input)
input = self.fp_output(input)
input = self.ReLU(input)
return input
# Define function to run network
# Useful: https://discuss.pytorch.org/t/input-form-of-conv1d/153775
def run_network(net, epochs, loss_criteria, optimizer):
# A minor difference is that the implementation of CrossEntrypyLoss implicitly applies a softmax activation followed by a log transformation
# but NLLLoss does not.
train_loss_all = []
valid_loss_all = []
# To keep best performance value
best_performance = float('inf')
# For early stopping
tolerance_level = 0
early_stopping_activated = 0
epoch = 0
MAX_MODEL = None
MAX_PERFORMANCE_WEIGHTS = None
while( epoch <= epochs and early_stopping_activated == 0 ):
train_loss = 0
valid_loss = 0
# Always have this line to ensure proper training
net.train()
for i, (data, actual) in enumerate(train_loader):
# If your input represents [batch_size, channels, height, width] use nn.Conv2d or manipulate the shape to create
# a 3-dimensional tensor (e.g. by flattening the spatial dimensions into a single one, if this fits your use case).
data = data[:, None, :] # change to [128, 1, 513]
# Push all variables to cuda
if(torch.cuda.is_available()):
data, actual = data.to(device), actual.to(device)
output = net(data)
loss = loss_criteria(output, actual)
optimizer.zero_grad() # reset gradients
loss.backward()
optimizer.step()
# Track loss
train_loss += loss.item()
train_loss_all.append(train_loss)
# evaluation part
# Always have this line to ensure proper evaluation
net.eval()
actual_values_all = []
pred_values_all = []
# Now do validation and keep track of valid loss
with torch.no_grad():
for j, (data, actual) in enumerate(valid_loader):
data = data[:, None, :] # change to [128, 1, 513]
if(torch.cuda.is_available()):
data, actual = data.to(device), actual.to(device)
# FP
val_preds = net.forward(data)
actual_values_all.append(actual.cpu().numpy())
pred_values_all.append(val_preds.cpu().numpy())
all_preds = np.concatenate(pred_values_all, axis=0)
all_targets = np.concatenate(actual_values_all, axis=0)
rmse_val = np.sqrt(mean_squared_error(all_targets,all_preds))
if(epoch % 10 == 0):
print(f'Epoch {epoch} \t\t Epoch Training loss: {train_loss} \t\t Validation RMSE {rmse_val}')
# Implementing early stopping
if(rmse_val < best_performance):
best_performance = rmse_val
tolerance_level = 0
# Save Model
MAX_PERFORMANCE_WEIGHTS = net.state_dict()
MAX_MODEL = net
else:
tolerance_level+=1
if(tolerance_level >= 50):
early_stopping_activated = 1
print('Early Stopping activated - no improvement in validation mse for the past 20 epochs. Using model stage before 20 epochs for further use.')
epoch+=1
print('Best Performance on Validation set achived till now :', best_performance)
return MAX_MODEL, MAX_PERFORMANCE_WEIGHTS
# First create model and then run with function
# EXPERT ADVICE: WHEN VALIDATION LOSS NEITHER INCREASES NOR DECREASES, REDUCE YOUR LR
from sklearn.metrics import mean_squared_error
net = None
net = q2_nn(input_dim = 513).to(device)
net.apply(xavier_weights) #he w stuck in 0.27
print(net)
#optimizer = torch.optim.Adam(net.parameters(), lr = 0.001) WITH 0.01 - VALIDATION LOSS DOES NOT CHANGE AT ALL - REACHES LOCAL MINIMA AND STUCK THERE
optimizer = torch.optim.Adam(net.parameters(), lr = 0.001)
loss_func = nn.MSELoss()
q2_net, q2_weights = run_network(net, epochs = 200, loss_criteria = loss_func, optimizer = optimizer)
q2_nn( (conv_layer1): Conv1d(1, 128, kernel_size=(3,), stride=(1,), padding=valid) (max_pool1): MaxPool1d(kernel_size=3, stride=1, padding=0, dilation=1, ceil_mode=False) (fp_input): Linear(in_features=65152, out_features=1024, bias=True) (fp_output): Linear(in_features=1024, out_features=513, bias=True) (dropout): Dropout(p=0.2, inplace=False) (ReLU): ReLU() ) Epoch 0 Epoch Training loss: 1.6335269324481487 Validation RMSE 0.20150774717330933 Epoch 10 Epoch Training loss: 0.276150681078434 Validation RMSE 0.12120294570922852 Epoch 20 Epoch Training loss: 0.1434810096397996 Validation RMSE 0.09980492293834686 Epoch 30 Epoch Training loss: 0.12410274846479297 Validation RMSE 0.09504435211420059 Epoch 40 Epoch Training loss: 0.11086217034608126 Validation RMSE 0.09416526556015015 Epoch 50 Epoch Training loss: 0.0986250767018646 Validation RMSE 0.08926770091056824 Epoch 60 Epoch Training loss: 0.08069428894668818 Validation RMSE 0.08559560775756836 Epoch 70 Epoch Training loss: 0.07247901731170714 Validation RMSE 0.08471028506755829 Epoch 80 Epoch Training loss: 0.06369857513345778 Validation RMSE 0.08382301032543182 Epoch 90 Epoch Training loss: 0.06141321919858456 Validation RMSE 0.08328167349100113 Epoch 100 Epoch Training loss: 0.05885017057880759 Validation RMSE 0.08340314775705338 Epoch 110 Epoch Training loss: 0.05193002778105438 Validation RMSE 0.08191866427659988 Epoch 120 Epoch Training loss: 0.05128859472461045 Validation RMSE 0.0826929584145546 Epoch 130 Epoch Training loss: 0.054226801032200456 Validation RMSE 0.08199629932641983 Epoch 140 Epoch Training loss: 0.053428928717039526 Validation RMSE 0.08279094099998474 Epoch 150 Epoch Training loss: 0.0475146429380402 Validation RMSE 0.0818198099732399 Epoch 160 Epoch Training loss: 0.0519395787268877 Validation RMSE 0.0826316848397255 Epoch 170 Epoch Training loss: 0.04644995729904622 Validation RMSE 0.08134890347719193 Epoch 180 Epoch Training loss: 0.04507821693550795 Validation RMSE 0.08233833312988281 Epoch 190 Epoch Training loss: 0.04143244028091431 Validation RMSE 0.0809660330414772 Epoch 200 Epoch Training loss: 0.04482298588845879 Validation RMSE 0.08192820101976395 Best Performance on Validation set achived till now : 0.08082839
IPython.display.Audio("A2_data/test_x_01.wav")
s, sr_test = librosa.load('A2_data/test_x_01.wav', sr = None)
S_test = librosa.stft(s, n_fft = 1024, hop_length = 512)
ST_test = S_test.T
ST_test_abs = np.abs(ST_test)
tensor_test = torch.tensor(ST_test_abs)
print('Test data shape ', tensor_test.shape)
test_dataset = torch.utils.data.TensorDataset(tensor_test)
test_loader = torch.utils.data.DataLoader(dataset=test_dataset,
batch_size=len(test_dataset),
shuffle=False)
q2_net.eval()
with torch.no_grad():
for i, (data) in enumerate(test_loader):
data = data[0][:, None, :]
data = data.to(device)
test_preds = q2_net.forward(data)
test_preds = test_preds.T
print(test_preds.shape)
# to recover audio from this data
Preds_noabs = np.multiply( np.divide(S_test, np.abs(S_test)), np.abs(test_preds.cpu().numpy()) )
# Recover time-domain speech signal by applying inverse STFT
S_hat_test = librosa.istft(stft_matrix = Preds_noabs, hop_length = 512)
sf.write('q2_test_s_01_recons.wav', S_hat_test, sr_test)
IPython.display.Audio("q2_test_s_01_recons.wav")
Test data shape torch.Size([142, 513]) torch.Size([513, 142])
IPython.display.Audio("A2_data/test_x_02.wav")
s, sr_test = librosa.load('A2_data/test_x_02.wav', sr = None)
S_test = librosa.stft(s, n_fft = 1024, hop_length = 512)
ST_test = S_test.T
ST_test_abs = np.abs(ST_test)
tensor_test = torch.tensor(ST_test_abs)
print('Test data shape ', tensor_test.shape)
test_dataset = torch.utils.data.TensorDataset(tensor_test)
test_loader = torch.utils.data.DataLoader(dataset=test_dataset,
batch_size=len(test_dataset),
shuffle=False)
q2_net.eval()
with torch.no_grad():
for i, (data) in enumerate(test_loader):
data = data[0][:, None, :]
data = data.to(device)
test_preds = q2_net.forward(data)
test_preds = test_preds.T
print(test_preds.shape)
# to recover audio from this data
Preds_noabs = np.multiply( np.divide(S_test, np.abs(S_test)), np.abs(test_preds.cpu().numpy()) )
# Recover time-domain speech signal by applying inverse STFT
S_hat_test = librosa.istft(stft_matrix = Preds_noabs, hop_length = 512)
sf.write('q2_test_s_02_recons.wav', S_hat_test, sr_test)
IPython.display.Audio("q2_test_s_02_recons.wav")
Test data shape torch.Size([380, 513]) torch.Size([513, 380])
# get clean speech from validation data
q2_net.eval()
with torch.no_grad():
for j, (data, actual) in enumerate(valid_loader):
data = data[:, None, :] # change to [128, 1, 513]
if(torch.cuda.is_available()):
data = data.to(device)
data = data.to(device)
valid_preds = q2_net.forward(data)
valid_preds = valid_preds.T.cpu().numpy()
# get x_test - this is the unclean or dirty or input speech
x_test = X_noabs.T[test_index].T
x_test_abs = np.abs(x_test)
# to recover audio from this data
valid_noabs = np.multiply( np.divide(x_test, x_test_abs), np.abs(valid_preds) )
# Recover time-domain speech signal by applying inverse STFT
valid_noabs_timedom = librosa.istft(stft_matrix = valid_noabs, hop_length = 512)
# Now get the time domain of clean actual speech of test_index
s_test = S_noabs.T[test_index].T
y_val_timedom = librosa.istft(stft_matrix = s_test, hop_length = 512)
# Calculate SNR
num = np.dot(y_val_timedom, y_val_timedom)
den = np.dot((y_val_timedom - valid_noabs_timedom),(y_val_timedom - valid_noabs_timedom))
print('SNR for this validation audio data', 10*math.log10( (num/den) + 1e-20 ))
SNR for this validation audio data 11.053617383964738
# Save this model:
# torch.save(q2_weights, 'A2_Final Model Q2')
s, sr = librosa.load('/content/drive/My Drive/Projects and research stuffs/DLS Assignments/A2_data/train_clean_male.wav', sr = None)
S_noabs = librosa.stft(s, n_fft = 1024, hop_length = 512)
s, sr = librosa.load('/content/drive/My Drive/Projects and research stuffs/DLS Assignments/A2_data/train_dirty_male.wav', sr = None)
X_noabs = librosa.stft(s, n_fft = 1024, hop_length = 512)
XT_noabs = X_noabs.T
ST_noabs = S_noabs.T
XT_abs = np.abs(XT_noabs)
ST_abs = np.abs(ST_noabs)
XT_abs.shape
(2459, 513)
# This time we have to divide into different overlapping frames on time axis
# list of lists...each image is a list
n_frames = 20
total_data = XT_abs.shape[0] - (n_frames)
X_data = []
Y_data = []
# We use frist 0-20 images to predict the 21st image, and then use 1-21 images to predict 22nd image.
# So we wouldnt have predictions for first 19 images, and so do some random numbers and add it there
for i in range(total_data):
X_data.append( XT_abs[i:i+n_frames][:] )
Y_data.append( ST_abs[i+n_frames][:] )
X_data = np.array(X_data)
Y_data = np.array(Y_data)
ss = ShuffleSplit(n_splits=1, test_size=0.1, random_state=100, train_size = 0.8)
ss.get_n_splits(X_data, Y_data) # these will have index correctly
train_index, test_index = next(ss.split(X_data, Y_data))
# x_train, x_val, y_train, y_val = train_test_split(XT_abs, ST_abs, test_size=0.25, random_state=100)
x_train, y_train = X_data[train_index], Y_data[train_index]
x_val, y_val = X_data[test_index], Y_data[test_index]
tensor_xtr = torch.from_numpy(x_train) # transform to torch tensor
tensor_xval = torch.from_numpy(x_val)
tensor_ytr = torch.from_numpy(y_train) # transform to torch tensor
tensor_yval = torch.from_numpy(y_val)
print(tensor_xtr.shape, tensor_yval.shape)
train_dataset = torch.utils.data.TensorDataset(tensor_xtr, tensor_ytr)
valid_dataset = torch.utils.data.TensorDataset(tensor_xval, tensor_yval)
# Our MNIST is already in tensorDataset format
batch_size_train = 128
batch_size_test = len(valid_dataset)
train_loader = torch.utils.data.DataLoader(dataset= train_dataset,
batch_size= batch_size_train,
shuffle= True)
valid_loader = torch.utils.data.DataLoader(dataset= valid_dataset,
batch_size= batch_size_test,
shuffle= False)
torch.Size([1951, 20, 513]) torch.Size([244, 513])
torch.cuda.is_available()
True
# Get device
if(torch.cuda.is_available()):
device = torch.device('cuda')
print(f"Using {device} device")
print(torch.cuda.get_device_name(0))
torch.backends.cudnn.enabled = False
# Set torch random seed
torch.manual_seed(100)
np.random.seed(100)
def he_weights(m):
if isinstance(m, nn.Linear):
torch.nn.init.kaiming_uniform_(m.weight, a=0, mode='fan_in', nonlinearity='relu')
m.bias.data.fill_(0)
# Now add for conv layer also
elif isinstance(m, nn.Conv2d):
torch.nn.init.kaiming_uniform_(m.weight, a=0, mode='fan_in', nonlinearity='relu')
m.bias.data.fill_(0)
def xavier_weights(m):
if isinstance(m, nn.Linear):
torch.nn.init.xavier_uniform_(m.weight)
m.bias.data.fill_(0)
elif isinstance(m, nn.Conv2d):
torch.nn.init.xavier_uniform_(m.weight)
m.bias.data.fill_(0)
Using cuda device Tesla T4
# Create convolution network
# Create a fully-connected neural network - need final activation to give non-negative values
class q3_nn(nn.Module):
def __init__(self, input_dim):
super().__init__()
self.output_channels = 32
# self.output_channels2 = 28
self.conv_kernel_size = 3
self.pool_kernel_size = 3
self.conv_stride = 1
self.pool_stride = 2
self.conv_padding = 1
self.pool_padding = 1
self.inputwidth_conv = input_dim
self.inputheight_conv = 20
self.conv_layer1 = nn.Conv2d(in_channels = 1, out_channels = self.output_channels, padding = 'same' , kernel_size = (self.conv_kernel_size,self.conv_kernel_size), stride = self.conv_stride) # 300 kernels, each of size 3x3 and no zero padding
# self.conv_layer2 = nn.Conv2d(in_channels = self.output_channels, out_channels = self.output_channels2, padding = 'valid', kernel_size = (self.conv_kernel_size,self.conv_kernel_size), stride = self.conv_stride) # 300 kernels, each of size 3x3 and no zero padding
self.max_pool1 = nn.MaxPool2d(kernel_size = (self.pool_kernel_size, self.pool_kernel_size), stride = self.pool_stride, padding = self.pool_padding, dilation=1)
pool1_height, pool1_width = self.calculate_size()
flatten_size = self.output_channels * int(pool1_height) * int(pool1_width)
self.fp_input = nn.Linear(int(flatten_size), 2048)
self.fp_output = nn.Linear(2048, 513)
self.dropout= nn.Dropout(0.2)
self.ReLU = nn.ReLU()
def calculate_size(self):
# calculate using [(W−K+2P)/S]+1 for both dimensions
conv1_height = (( self.inputheight_conv - (self.conv_kernel_size - 1) - 1 + 2*self.conv_padding )/self.conv_stride) + 1
conv1_width = (( self.inputwidth_conv - (self.conv_kernel_size - 1) - 1 + 2*self.conv_padding )/self.conv_stride) + 1
# pool1_height = (( conv1_height - (self.pool_kernel_size - 1) - 1 + 2*self.pool_padding )/self.pool_stride) + 1
# pool1_width = (( conv1_width - (self.pool_kernel_size - 1) - 1 + 2*self.pool_padding )/self.pool_stride) + 1
# conv2_height = (( pool1_height - (self.conv_kernel_size - 1) - 1 + 2*self.conv_padding )/self.conv_stride) + 1
# conv2_width = (( pool1_width - (self.conv_kernel_size - 1) - 1 + 2*self.conv_padding )/self.conv_stride) + 1
pool2_height = (( conv1_height - (self.pool_kernel_size - 1) - 1 + 2*self.pool_padding )/self.pool_stride) + 1
pool2_width = (( conv1_width - (self.pool_kernel_size - 1) - 1 + 2*self.pool_padding )/self.pool_stride) + 1
return pool2_height, pool2_width
def forward(self, input):
input = self.conv_layer1(input)
input = self.ReLU(input)
# input = self.max_pool1(input)
# torch.cuda.empty_cache()
# input = self.conv_layer2(input)
# input = self.ReLU(input)
input = self.max_pool1(input)
torch.cuda.empty_cache()
# print('after pool', input.shape)
# Now flatten this, and start from dimension 1 (don't include batch size to flatten)
input = torch.flatten(input, start_dim = 1)
# print('fireflies', input.shape )
torch.cuda.empty_cache()
input = self.fp_input(input)
torch.cuda.empty_cache()
input = self.dropout(input)
input = self.ReLU(input)
torch.cuda.empty_cache()
input = self.fp_output(input)
torch.cuda.empty_cache()
input = self.ReLU(input)
torch.cuda.empty_cache()
return input
# Define function to run network
# Useful: https://discuss.pytorch.org/t/input-form-of-conv1d/153775
def run_network(net, epochs, loss_criteria, optimizer):
# A minor difference is that the implementation of CrossEntrypyLoss implicitly applies a softmax activation followed by a log transformation
# but NLLLoss does not.
train_loss_all = []
valid_loss_all = []
# To keep best performance value
best_performance = float('inf')
# For early stopping
tolerance_level = 0
early_stopping_activated = 0
epoch = 0
MAX_MODEL = None
MAX_PERFORMANCE_WEIGHTS = None
while( epoch <= epochs and early_stopping_activated == 0 ):
train_loss = 0
valid_loss = 0
# Always have this line to ensure proper training
net.train()
for i, (data, actual) in enumerate(train_loader):
# If your input represents [batch_size, channels, height, width] use nn.Conv2d or manipulate the shape to create (height is ignored here)
# a 3-dimensional tensor (e.g. by flattening the spatial dimensions into a single one, if this fits your use case).
torch.cuda.empty_cache()
data = data[:, None, :, :] # change to [128, 1, 513]
# print('input', data.shape)
# Push all variables to cuda
if(torch.cuda.is_available()):
data, actual = data.to(device), actual.to(device)
output = net(data)
torch.cuda.empty_cache()
loss = loss_criteria(output, actual)
optimizer.zero_grad() # reset gradients
loss.backward()
optimizer.step()
torch.cuda.empty_cache()
# Track loss
train_loss += loss.item()
train_loss_all.append(train_loss)
# evaluation part
# Always have this line to ensure proper evaluation
net.eval()
actual_values_all = []
pred_values_all = []
# Now do validation and keep track of valid loss
with torch.no_grad():
for j, (data, actual) in enumerate(valid_loader):
torch.cuda.empty_cache()
data = data[:, None, :, :] # change to [128, 1, 513]
if(torch.cuda.is_available()):
data, actual = data.to(device), actual.to(device)
# FP
val_preds = net.forward(data)
torch.cuda.empty_cache()
actual_values_all.append(actual.cpu().numpy())
pred_values_all.append(val_preds.cpu().numpy())
all_preds = np.concatenate(pred_values_all, axis=0)
all_targets = np.concatenate(actual_values_all, axis=0)
rmse_val = np.sqrt(mean_squared_error(all_targets,all_preds))
if(epoch % 10 == 0):
print(f'Epoch {epoch} \t\t Epoch Training loss: {train_loss} \t\t Validation RMSE {rmse_val}')
# Implementing early stopping
if(rmse_val < best_performance):
best_performance = rmse_val
tolerance_level = 0
# Save Model
MAX_PERFORMANCE_WEIGHTS = net.state_dict()
MAX_MODEL = net
else:
tolerance_level+=1
if(tolerance_level >= 50):
early_stopping_activated = 1
print('Early Stopping activated - no improvement in validation mse for the past 20 epochs. Using model stage before 20 epochs for further use.')
epoch+=1
torch.cuda.empty_cache()
print('Best Performance on Validation set achived till now :', best_performance)
return MAX_MODEL, MAX_PERFORMANCE_WEIGHTS
# First create model and then run with function
# EXPERT ADVICE: WHEN VALIDATION LOSS NEITHER INCREASES NOR DECREASES, REDUCE YOUR LR
net = None
net = q3_nn(input_dim = 513).to(device)
net.apply(xavier_weights)
torch.cuda.empty_cache()
print(net)
torch.cuda.empty_cache()
optimizer = torch.optim.Adam(net.parameters(), lr = 0.001)
torch.cuda.empty_cache()
loss_func = nn.MSELoss()
torch.cuda.empty_cache()
q3_net, q3_weights = run_network(net, epochs = 200, loss_criteria = loss_func, optimizer = optimizer)
q3_nn( (conv_layer1): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1), padding=same) (max_pool1): MaxPool2d(kernel_size=(3, 3), stride=2, padding=1, dilation=1, ceil_mode=False) (fp_input): Linear(in_features=82240, out_features=4096, bias=True) (fp_output): Linear(in_features=4096, out_features=513, bias=True) (dropout): Dropout(p=0.2, inplace=False) (ReLU): ReLU() ) Epoch 0 Epoch Training loss: 1.5102691985666752 Validation RMSE 0.2849431335926056 Epoch 10 Epoch Training loss: 0.6644753124564886 Validation RMSE 0.2485458254814148 Epoch 20 Epoch Training loss: 0.44072350673377514 Validation RMSE 0.24397364258766174 Epoch 30 Epoch Training loss: 0.30809641908854246 Validation RMSE 0.23921476304531097 Epoch 40 Epoch Training loss: 0.2581821624189615 Validation RMSE 0.23755817115306854 Epoch 50 Epoch Training loss: 0.2343015754595399 Validation RMSE 0.2378583550453186 Epoch 60 Epoch Training loss: 0.21546931378543377 Validation RMSE 0.23444722592830658 Epoch 70 Epoch Training loss: 0.1489065163768828 Validation RMSE 0.22909393906593323 Epoch 80 Epoch Training loss: 0.13102087704464793 Validation RMSE 0.22927513718605042 Epoch 90 Epoch Training loss: 0.1315031824633479 Validation RMSE 0.22924456000328064 Epoch 100 Epoch Training loss: 0.10712239099666476 Validation RMSE 0.22807437181472778 Epoch 110 Epoch Training loss: 0.0968517349101603 Validation RMSE 0.22627593576908112 Epoch 120 Epoch Training loss: 0.0871338949073106 Validation RMSE 0.2256203144788742 Epoch 130 Epoch Training loss: 0.08571650320664048 Validation RMSE 0.22361905872821808 Epoch 140 Epoch Training loss: 0.07981432229280472 Validation RMSE 0.22545106709003448 Epoch 150 Epoch Training loss: 0.06883806944824755 Validation RMSE 0.22331561148166656 Epoch 160 Epoch Training loss: 0.06661694310605526 Validation RMSE 0.22438278794288635 Epoch 170 Epoch Training loss: 0.06674264022149146 Validation RMSE 0.22273825109004974 Epoch 180 Epoch Training loss: 0.05862601473927498 Validation RMSE 0.2209535837173462 Epoch 190 Epoch Training loss: 0.05964221083559096 Validation RMSE 0.22113744914531708 Epoch 200 Epoch Training loss: 0.05568545078858733 Validation RMSE 0.22159892320632935 Best Performance on Validation set achived till now : 0.22034484
# First create model and then run with function
# EXPERT ADVICE: WHEN VALIDATION LOSS NEITHER INCREASES NOR DECREASES, REDUCE YOUR LR
net = None
net = q3_nn(input_dim = 513).to(device)
net.apply(xavier_weights)
torch.cuda.empty_cache()
print(net)
torch.cuda.empty_cache()
optimizer = torch.optim.Adam(net.parameters(), lr = 0.001)
torch.cuda.empty_cache()
loss_func = nn.MSELoss()
torch.cuda.empty_cache()
q3_net1, q3_weights1 = run_network(net, epochs = 100, loss_criteria = loss_func, optimizer = optimizer)
IPython.display.Audio("/content/drive/My Drive/Projects and research stuffs/DLS Assignments/A2_data/test_x_01.wav")
s, sr_test = librosa.load('/content/drive/My Drive/Projects and research stuffs/DLS Assignments/A2_data/test_x_01.wav', sr = None)
S_test = librosa.stft(s, n_fft = 1024, hop_length = 512)
ST_test = S_test.T
ST_test_abs = np.abs(ST_test)
n_frames = 20
total_data = ST_test_abs.shape[0] - (n_frames)
X_data = []
# We use frist 0-20 images to predict the 21st image, and then use 1-21 images to predict 22nd image.
# So we wouldnt have predictions for first 19 images, and so do some random numbers and add it there
for i in range(total_data):
X_data.append( ST_test_abs[i:i+n_frames][:] )
X_data = np.array(X_data)
tensor_test = torch.tensor(X_data)
print('Test data shape ', tensor_test.shape)
test_dataset = torch.utils.data.TensorDataset(tensor_test)
test_loader = torch.utils.data.DataLoader(dataset=test_dataset,
batch_size=len(test_dataset),
shuffle=False)
q3_net.eval()
with torch.no_grad():
for i, (data) in enumerate(test_loader):
data = data[0][:, None, :, :]
data = data.to(device)
test_preds = q3_net.forward(data)
test_preds = test_preds.T
print(test_preds.shape)
# to recover audio from this data
Preds_noabs = np.multiply( np.divide(S_test.T[20:].T, np.abs(S_test.T[20:].T)), np.abs(test_preds.cpu().numpy()) )
# JUST IGNORING THE FIRST 20 VALUES FOR NOW
# Recover time-domain speech signal by applying inverse STFT
S_hat_test = librosa.istft(stft_matrix = Preds_noabs, hop_length = 512)
sf.write('/content/drive/My Drive/Projects and research stuffs/DLS Assignments/q3_test_s_01_recons.wav', S_hat_test, sr_test)
IPython.display.Audio("/content/drive/My Drive/Projects and research stuffs/DLS Assignments/q3_test_s_01_recons.wav")
Test data shape torch.Size([122, 20, 513]) torch.Size([513, 122])
IPython.display.Audio("/content/drive/My Drive/Projects and research stuffs/DLS Assignments/A2_data/test_x_02.wav")
s, sr_test = librosa.load('/content/drive/My Drive/Projects and research stuffs/DLS Assignments/A2_data/test_x_02.wav', sr = None)
S_test = librosa.stft(s, n_fft = 1024, hop_length = 512)
ST_test = S_test.T
ST_test_abs = np.abs(ST_test)
n_frames = 20
total_data = ST_test_abs.shape[0] - (n_frames)
X_data = []
# We use frist 0-20 images to predict the 21st image, and then use 1-21 images to predict 22nd image.
# So we wouldnt have predictions for first 19 images, and so do some random numbers and add it there
for i in range(total_data):
X_data.append( ST_test_abs[i:i+n_frames][:] )
X_data = np.array(X_data)
tensor_test = torch.tensor(X_data)
print('Test data shape ', tensor_test.shape)
test_dataset = torch.utils.data.TensorDataset(tensor_test)
test_loader = torch.utils.data.DataLoader(dataset=test_dataset,
batch_size=len(test_dataset),
shuffle=False)
q3_net.eval()
with torch.no_grad():
for i, (data) in enumerate(test_loader):
data = data[0][:, None, :, :]
data = data.to(device)
test_preds = q3_net.forward(data)
test_preds = test_preds.T
print(test_preds.shape)
# to recover audio from this data
Preds_noabs = np.multiply( np.divide(S_test.T[20:].T, np.abs(S_test.T[20:].T)), np.abs(test_preds.cpu().numpy()) )
# JUST IGNORING THE FIRST 20 VALUES FOR NOW
# Recover time-domain speech signal by applying inverse STFT
S_hat_test = librosa.istft(stft_matrix = Preds_noabs, hop_length = 512)
sf.write('/content/drive/My Drive/Projects and research stuffs/DLS Assignments/q3_test_s_02_recons.wav', S_hat_test, sr_test)
IPython.display.Audio("/content/drive/My Drive/Projects and research stuffs/DLS Assignments/q3_test_s_02_recons.wav")
Test data shape torch.Size([360, 20, 513]) torch.Size([513, 360])